package com.wx.learn.flume;

import com.google.common.collect.ImmutableMap;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;

//自定义sink
public class MyHDFSSink extends AbstractSink implements Configurable {
    private PrintWriter pw;
    private String path;
    private FileSystem fs;
    private List<Event> events;

    @Override
    public synchronized void start() {
        super.start();
        try {
            Configuration conf = new Configuration();
            this.fs = FileSystem.get(conf);
            Path path = new Path("/mysink");
            boolean exists = this.fs.exists(path);
            if (!exists) {
                this.fs.mkdirs(path);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.events = new ArrayList<>();
    }

    @Override
    public Status process() throws EventDeliveryException {
        Channel ch = this.getChannel();
        Transaction ts = ch.getTransaction();

        //开启事务
        ts.begin();
        Event event = null;

        while (true) {
            event = ch.take();
            if (event != null) {
                break;
            }
        }

        this.events.add(event);
        if (this.events.size() == 100) {
            FSDataOutputStream fsdos = null;
            try {
                fsdos = this.fs.create(new Path(this.path + "/" + System.currentTimeMillis()));
                this.pw = new PrintWriter(new OutputStreamWriter(fsdos), true);
                events.forEach(event1 -> {
                    byte[] body = event1.getBody();
                    String data = new String(body);
                    this.pw.println(data);
                });
                this.pw.close();
                this.events.clear();
                ts.commit();
                return Status.READY;
            } catch (IOException e) {
                e.printStackTrace();
                ts.rollback();
                return Status.BACKOFF;
            } finally {
                ts.close();
            }
        } else {
            return Status.BACKOFF;
        }

    }


    @Override
    public void configure(Context context) {
        ImmutableMap<String, String> params = context.getParameters();
        this.path = context.getString("hdfs.path");
    }
}
